package cn.lsh.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CountProducerInterceptor implements ProducerInterceptor<Integer, String> {
	private int success = 0;
	private int fail = 0;

	@Override
	public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> producerRecord) {
		return producerRecord;
	}

	@Override
	public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
		if (e == null) {
			fail++;
		} else {
			success++;
		}
	}

	@Override
	public void close() {
		System.out.println("发送消息成功条数：" + success);
		System.out.println("发送消息失败条数：" + fail);
	}

	@Override
	public void configure(Map<String, ?> map) {

	}
}
